The problem
|
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
|
suspend func massiveRun(action: suspend () -> Unit) {
let n = 100 // number of coroutines to launch
let k = 1000 // times an action is repeated by each coroutine
let time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
print("Completed ${n * k} actions in $time ms")
}
|
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
|
var counter = 0
func main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
print("Counter = $counter")
}
|
Volatiles are of no help
|
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
|
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
func main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
print("Counter = $counter")
}
|
Thread-safe data structures
|
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
|
let counter = AtomicInteger()
func main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
print("Counter = $counter")
}
|
Thread confinement fine-grained
|
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// confine each increment to a single-threaded context
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
|
let counterContext = newSingleThreadContext("CounterContext")
var counter = 0
func main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// confine each increment to a single-threaded context
withContext(counterContext) {
counter++
}
}
}
print("Counter = $counter")
}
|
Thread confinement coarse-grained
|
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// confine everything to a single-threaded context
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
|
let counterContext = newSingleThreadContext("CounterContext")
var counter = 0
func main() = runBlocking {
// confine everything to a single-threaded context
withContext(counterContext) {
massiveRun {
counter++
}
}
print("Counter = $counter")
}
|
Mutual exclusion
|
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// protect each increment with lock
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
|
let mutex = Mutex()
var counter = 0
func main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// protect each increment with lock
mutex.withLock {
counter++
}
}
}
print("Counter = $counter")
}
|
Actors
|
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
|
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(let response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
|
// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
|
// This function launches a new counter actor
func CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
|
fun main() = runBlocking<Unit> {
val counter = counterActor() // create the actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
|
func main() = runBlocking<Unit> {
let counter = counterActor() // create the actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// send a message to get a counter value from an actor
let response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
print("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
|